Pair trading is a commonly used quantitative trading strategy used in hedge funds and investment banks. A major aspect of this strategy is the spread analysis/prediction amongst a pair of selected stocks.
The prinicple is as follows.
Let's say you have a pair of securities X and Y that have some underlying economic link. An example might be two companies that manufacture the same product, or two companies in one supply chain. If we can model this economic link with a mathematical model, we can make trades on it.
#Importing Dependencies
import numpy as np
import pandas as pd
import os
import glob
import pickle as pickle
import json
import missingno
import itertools
from itertools import cycle
from datetime import datetime
from builtins import object
from datetime import date
from dateutil.relativedelta import relativedelta
from collections import defaultdict
from concurrent import futures
import pandas_datareader.data as web
import warnings
warnings.filterwarnings('ignore')
#visualization dependencies
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import seaborn as sns
from plotly import graph_objects as go
from plotly import offline as py
import plotly.express as px
%matplotlib inline
#machine learning/statistical dependencies
import statsmodels.api as sm
from kneed import KneeLocator
from sklearn import metrics
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error, silhouette_score
from sklearn.cluster import KMeans, AgglomerativeClustering, AffinityPropagation
from sklearn.manifold import TSNE
import statsmodels.api as sm
import statsmodels.tsa.stattools as ts
from statsmodels.tsa.arima_model import ARIMA
from pykalman import KalmanFilter
from keras.models import Sequential, Model
from keras.layers import Dense, LSTM, Dropout
import scipy.cluster.hierarchy as shc
We scrape the web to gather the financial data of S&P 500 companies for last 5 years from current date to be utilized for the proposed pair trading strategy implementation.
Data Sources
The ticker, sector and industry metadata for S&P 500 companies was scraped from Wikipidea. (https://en.wikipedia.org/wiki/List_of_S%26P_500_companies)
The financial information for each company was gathered using python's pandas_datareader library using the company tickers as identifiers.
We developed the class gather_data, which encapculates all the function utilized for the data gathering process.
Quick overview of the functions:
download_s_and_p_stats(): Used to fetch S&P 500 company metadata (ticker, sector, industry) from wikipidea.
download_stock_financials(): Used to fetch financial data (open, low, high, close, volume, adj close) given a stock ticker.
MultiThreader(): Used to speed up the process of fetching stock data using multi threading.
combine_stock_data(): Used to merge all the stock CSV data files into on combined file.
The data gathering process consists of the following steps:
The MultiThreader() is called through a class object. It utilizes the download_s_and_p_stats() to get the stock metadata. This metadata is then passed to the download_stock_financials() with 50 maximum parallel workers.
The download_stock_financials() extracts the ticker from each stock's metadata. This ticker is used with the start and end dates to fetch financial data of the stock for that time period.
The financial data is merged with the metadata for each stock and a CSV file for each stock is created and stored at the given location.
We utilize the combine_stock_data() to merge all the gathered stock CSV files into one dataframe and optionally store it in the given location
class gather_data(object):
def __init__(self):
super(gather_data, self).__init__()
#The start date for historical data
self.start_time = str(date.today()- relativedelta(years=5))
#The end date for historical data
self.end_time = str(date.today())
def download_s_and_p_stats(self):
'''
Fetches comapny ticker, sector and industry data for
S&P 500 companies
'''
data = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')
df = data[0].reset_index()
#Extracting required features
df = df[['Symbol','GICS Sector','GICS Sub-Industry']]
#Formatting the group symbols
df['Symbol'] = [sym.replace('.','-') if sym.find('.') != -1 else sym for sym in list(df['Symbol']) ]
return df
def download_stock_financials(self,stock):
'''
Takes a stock ticker as input and fetches related financial data given the start and end time
Inputs
stock-- The stock ticker
The function generates and stores a csv file for stock financial data under the given path
'''
try:
stock_data = web.DataReader(stock[0],'yahoo', self.start_time, self.end_time)
stock_data[['Symbol','GICS Sector','GICS Sub-Industry']] = pd.DataFrame([stock], index=stock_data.index)
output_name = stock[0] + '_data.csv'
stock_data.to_csv(self.op_path +str(output_name))
except:
print('bad: %s' % (stock[0]))
def MultiThreader(self, op_path):
'''
We utilize the ThreadPoolExecutor from concurrent.futures module
to speed up the downloads utilizing parallelism
Input
op_path-- The path for output csv to be stored
'''
self.op_path = op_path
max_workers = 50
sp_stocks = self.download_s_and_p_stats()
#To handle case when smaller number of stock than thread are passed
workers = min(max_workers, len(sp_stocks))
with futures.ThreadPoolExecutor(workers) as executor:
res = executor.map(self.download_stock_financials, sp_stocks.values)
def combine_stock_data(self,curr_path, files_path, output_path = None, save_csv=False):
'''
Function to combine csv files for all stocks into one
Inputs:
curr_path-- Path for Current working directory
files_path-- Path where the files to be merged are stored
output_path-- Path to store the merged CSV file (default=None)(optional)
save_csv-- Boolean to specificy is CSV file should be stored (default=False)(optional)
'''
#changing the working directory to files path
os.chdir(files_path)
#fetching all file names with an '.csv' extension
file_extension = '.csv'
all_filenames = [i for i in glob.glob(f"*{file_extension}")]
#combining the files
combined_stocks_data = pd.concat([pd.read_csv(f ) for f in all_filenames])
if save_csv == True:
#changing working directory to the given output path
os.chdir(output_path)
#saving our combined csv data
combined_stocks_data.to_csv('combined_stocks_data.csv')
#chaning work directory back to original
os.chdir(curr_path)
return combined_stocks_data
if __name__ == '__main__':
#defining working directory paths
curr_path = os.getcwd()
files_path = curr_path + '/Data/Individual Stocks/'
output_path = curr_path + '/Data/'
#declaring gather_data class object
gd_obj = gather_data()
#downloading stock data
gd_obj.MultiThreader(files_path)
#combining the data files
combined_data = gd_obj.combine_stock_data(curr_path,files_path, output_path, True)
#displaying
combined_data.head()
#Shape of the combined data
np.shape(combined_data)
The data preprocessing part contains of 2 steps:
Feature Engineering:
We apply feature engineering on the data to extract multiple technical indicators which might help us to understand and realize the link between two stocks.
For this we make use of generate_technical_indicator() from the feature_engineering class to derive and add aditional features to a given the stock financials data.
Data Cleaning: The NA values are handles and the data is standardised to prepare it for future analysis
$$\text{RS} = -\frac{AvgGain_{prev} * 13 + Current Gain}{AvgLoss_{prev} * 13 + Current Loss}$$
$$\text{accdist} = accdist_{prev} + volume * CLV $$
Average Directional Movement Index - It is an indicator of strength of the trend for a given stock. . It is compose of two indicators, positive direction indicator (+DI) and negative direction indicator(-DI)
Moving Average Convergence Divergence - It is a trend-following momentum indicator that shows the relationship between two moving averages of prices in a given time window
Daily returns - The percentage gain from previous day's Adjacent closing price
Log returns - The percentage gain from logarithmic previous day's closing price
Reference: Wikipedia
class feature_engineering(object):
def __init__(self):
super(feature_engineering, self).__init__()
def generate_technical_indicator(self,data_df):
'''
Function to generate additional technical indicators for the stock
Input:
data_df-- Dataframe containing stock finacials data
Output:
Stock finacials data with added Dataframe of feature obtained from feature engineering
'''
# 1. Momentum Indicators
# Relative Strength Index
df = data_df
df['rsi'] = ta.momentum.rsi(df['Close'], window=14)
#Kaufman’s Adaptive Moving Average (KAMA)
df['kama'] = ta.momentum.kama(df['Close'],window=14)
# 2. Volume Indicators
# Accumulation/Distribution Index (ADI)
df['adi'] = ta.volume.acc_dist_index(df['High'], df['Low'], df['Close'], df['Volume'])
# Volume-price trend (VPT)
df['vpt'] = ta.volume.volume_price_trend(df['Close'], df['Volume'])
# 3. Volatility Indicators
# Average True Range (ATR)
df['atr'] = ta.volatility.average_true_range(df['High'], df['Low'],df['Close'], window=14)
# Bollinger Bands (BB) N-period simple moving average (MA)
df['bb_ma'] = ta.volatility.bollinger_mavg(df['Close'], window=20)
# 4. Trend Indicators
# Average Directional Movement Index (ADX)
df['adx'] = ta.trend.adx(df['High'], df['Low'], df['Close'], window=14)
# Exponential Moving Average
df['ema'] = ta.trend.ema_indicator(df['Close'], window=14)
# Moving Average Convergence Divergence (MACD)
df['macd'] = ta.trend.macd(df['Close'], window_fast=14, window_slow=30)
# 5. Other Indicators
# Daily Log Return (DLR)
df['dlr'] = ta.others.daily_log_return(df['Close'])
# Daily Returns
df['daily_returns'] = df['Adj Close'].pct_change()
# Moving Averages
averages = [50,200]
for avg in averages:
col_name = str(avg) +' Days Average'
df[col_name] = df['Adj Close'].rolling(window = avg, center = False).mean()
return df
#declaring feature_engineering class object
fe_obj = feature_engineering()
#Adding technical indicators
final_data = fe_obj.generate_technical_indicator(combined_data)
final_data.head()
#Null value check
final_data.isna().sum()
#Cleaning out the NA values
final_data = final_data.dropna()
np.shape(final_data)
#Null value check
final_data.isna().sum()
final_data.columns
#Applying Standard Scaling on the Contiunous features
train_data = final_data[['High', 'Low', 'Open', 'Close', 'Volume', 'Adj Close','rsi', 'kama', 'adi', 'vpt', 'atr',
'bb_ma', 'adx', 'ema', 'macd', 'dlr', 'daily_returns',
'50 Days Average', '200 Days Average']]
standard_df = StandardScaler().fit_transform(train_data)
standard_df.shape
#Null value check
train_data.isna().sum()
Below is the data created after initial data scraping and feature engineering. It consists of 5 year daily data for all the S&P 500 tickers
final_data = pd.read_csv('/Users/prakharpatidar/Google Drive/DS/InfoViz DS 5500/Data/Work_data/final_data.csv', index_col = 0)
final_data = final_data.dropna().reset_index(drop=True)
final_data.head()
final_data.columns
df = final_data.pivot(index='Date', columns='Symbol', values='Adj Close')
df.head()
exp_data = df.copy()
pd.set_option('precision', 3)
exp_data.describe().T.head()
exp_data.isnull().values.any()
missingno.matrix(exp_data)
print('Data Shape before cleaning =', exp_data.shape)
missing_percentage = exp_data.isnull().mean().sort_values(ascending=False)
missing_percentage.head(10)
dropped_list = sorted(list(missing_percentage[missing_percentage > 0.2].index))
exp_data.drop(labels=dropped_list, axis=1, inplace=True)
print('Data Shape after cleaning =', exp_data.shape)
exp_data = exp_data.fillna(exp_data.median())
exp_data
missingno.matrix(exp_data)
returns = exp_data.pct_change().mean()*266
returns = pd.DataFrame(returns)
returns.columns = ['returns']
returns['volatility'] = exp_data.pct_change().std()*np.sqrt(266)
ret_data = returns
ret_data.head()
#Prepare the scaler
scale = StandardScaler().fit(ret_data)
#Fit the scaler
scaled_data = pd.DataFrame(scale.fit_transform(ret_data),columns = ret_data.columns, index = ret_data.index)
X = scaled_data
X.head()
Cluster Analysis is a technique used to group sets of objects that share similar characteristics. It has been prominently used in the finance domain by investors to develop a cluster trading approach to help them build a diversified portfolio, helping protect the investor’s portfolio against systemic risks that could make the portfolio vulnerable to losses.
Here we explore three different clustering techniques for extracting different clusters of stocks exhibiting minimal correlation from one another. The three techniques are compared based on Silhouette Scores and the clusters from the technique with highest score is selected for further analysis. The pairs in each cluster are tested for cointegration via the ADF test giving us optimal pairs for our pair trading strategy.
K-Means algorithm is an iterative algorithms that partitions the datasets into k distinct non-overlapping clusters where each datapoint belongs exactly to one cluster. It tries to cluster most similar (homogenous) points together while also maximizing distance between each individual cluster. The elbow method is used to iterate through the values of k and calculate the distortion for each value of k, and distortion and inertia for each value of k in the specified range, where distortion is the average of
the squared distances of each feature to the closest cluster center. We use Silhouette method to reaffirm the optimal number of clusters (K). The K-Means clustering model is trained using the found optimal K value to generate clusters for our pair selection.
K = range(1,15)
distortions = []
#Fit the method
for k in K:
kmeans = KMeans(n_clusters = k)
kmeans.fit(X)
distortions.append(kmeans.inertia_)
#Plot the results
fig = plt.figure(figsize= (15,5))
plt.plot(K, distortions, 'bx-')
plt.xlabel('Values of K')
plt.ylabel('Distortion')
plt.title('Elbow Method')
plt.grid(True)
plt.show()
kl = KneeLocator(K, distortions, curve="convex", direction="decreasing")
kl.elbow
#For the silhouette method k needs to start from 2
K = range(2,15)
silhouettes = []
#Fit the method
for k in K:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10, init='random')
kmeans.fit(X)
silhouettes.append(silhouette_score(X, kmeans.labels_))
#Plot the results
fig = plt.figure(figsize= (15,5))
plt.plot(K, silhouettes, 'bx-')
plt.xlabel('Values of K')
plt.ylabel('Silhouette score')
plt.title('Silhouette Method')
plt.grid(True)
plt.show()
kl = KneeLocator(K, silhouettes, curve="convex", direction="decreasing")
print('Suggested number of clusters: ', kl.elbow)
c = 8
#Fit the model
k_means = KMeans(n_clusters=c)
k_means.fit(X)
prediction = k_means.predict(X)
#Plot the results
centroids = k_means.cluster_centers_
fig = plt.figure(figsize = (18,10))
ax = fig.add_subplot(111)
scatter = ax.scatter(X.iloc[:,0],X.iloc[:,1], c=k_means.labels_, cmap="rainbow", label = X.index)
ax.set_title('k-Means Cluster Analysis Results')
ax.set_xlabel('Mean Return')
ax.set_ylabel('Volatility')
plt.colorbar(scatter)
plt.plot(centroids[:,0],centroids[:,1],'sg',markersize=10)
plt.show()
clustered_series = pd.Series(index=X.index, data=k_means.labels_.flatten())
clustered_series_all = pd.Series(index=X.index, data=k_means.labels_.flatten())
clustered_series = clustered_series[clustered_series != -1]
plt.figure(figsize=(12,8))
plt.barh(range(len(clustered_series.value_counts())),clustered_series.value_counts())
plt.title('Clusters')
plt.xlabel('Stocks per Cluster')
plt.ylabel('Cluster Number')
plt.show()
counts = clustered_series.value_counts()
cluster_vis_list = list(counts[(counts<20) & (counts>1)].index)[::-1]
clust1 = pd.DataFrame(clustered_series,columns=['label'])
grouped_df = clust1.groupby('label')
pivot_data = final_data.pivot(index='Symbol', columns='Date', values='Adj Close')
pivot_data.head()
#visualizing time series for clusters in K-means
for key, item in grouped_df:
print('Group: ',key)
grp_df = grouped_df.get_group(key)
Syms = grp_df.index
means = np.log(pivot_data.loc[Syms,].T.mean())
temp = np.log(pivot_data.loc[Syms,].T.sub(means))
# temp.plot()
# plt.show()
temp = temp.reset_index()
col_lst = temp.columns.values.tolist()
col_lst = col_lst.remove('Date')
df_melt = temp.melt(id_vars='Date', value_vars=col_lst)
fig = px.line(df_melt, x='Date', y='value',color='Symbol')
fig.show()
Hierarchical Clustering is an unsupervised algorithm that groups similar objects into distinct clusters. The method performs the clustering by creating a tree of clusters by grouping and separating features on each iteration. The product of the clustering process is visualized in a figure known as “dendrogram”. The groupage can be performed by an agglomerative (bottom-up) or divisive (top-down) approach. The clustering technique can be applied using different similarity measures such as Ward, Average, Complete or Single Linkage. The main benefit of this algorithms is that, unlike K-Means, it doesn’t require us to specify the number of clusters in advance.
Since we want to minimize the variance distance between our clusters, for our purpose, we apply Hierarchical Clustering using Ward linkage. A distance threshold of 13.5, selected using hyper parameter tuning, is used for extracting the number of clusters for training the Hierarchical (Agglomerative) Clustering model.
#plotting dendrogram
plt.figure(figsize=(15, 10))
plt.title("Dendrograms")
dend = shc.dendrogram(shc.linkage(X, method='ward'))
#plotting cutoff threshold for dendrogram
plt.figure(figsize=(15, 10))
plt.title("Dendrogram")
dend = shc.dendrogram(shc.linkage(X, method='ward'))
plt.axhline(y=13.5, color='purple', linestyle='--')
#Fit the model
clusters = 4
hc = AgglomerativeClustering(n_clusters= clusters, affinity='euclidean', linkage='ward')
labels = hc.fit_predict(X)
#Plot the results
fig = plt.figure(figsize=(15,10))
ax = fig.add_subplot(111)
scatter = ax.scatter(X.iloc[:,0], X.iloc[:,1], c=labels, cmap='rainbow')
ax.set_title('Hierarchical Clustering Results')
ax.set_xlabel('Mean Return')
ax.set_ylabel('Volatility')
plt.colorbar(scatter)
plt.show()
Affinity Propagation Clustering is a method that creates clusters by a criterion of how well suited an instance is to be a representative of another. This can be imagined as instances messaging each other on how much they suit one another. After that, an instance that received messages from multiple senders will send back the revised value of attractiveness to each sender. This messaging will proceed until an agreement is reached. When a sender gets associated with the receiver the receiver will become the exemplar. All data points with the same exemplar will then create a cluster. The advantage of using Affinity Propagation Clustering is that it determines the clusters by itself in fully unsupervised manner.
#Fit the model
ap = AffinityPropagation()
ap.fit(X)
labels1 = ap.predict(X)
#Plot the results
fig = plt.figure(figsize=(15,10))
ax = fig.add_subplot(111)
scatter = ax.scatter(X.iloc[:,0], X.iloc[:,1], c=labels1, cmap='rainbow')
ax.set_title('Affinity Propagation Clustering Results')
ax.set_xlabel('Mean Return')
ax.set_ylabel('Volatility')
plt.colorbar(scatter)
plt.show()
#Extract the cluster centers and labels
cci = ap.cluster_centers_indices_
labels2 = ap.labels_
#Print their number
clusters = len(cci)
print('The number of clusters is:',clusters)
#Plot the results
X_ap = np.asarray(X)
plt.close('all')
plt.figure(1)
plt.clf
fig=plt.figure(figsize=(15,10))
colors = cycle('cmykrgbcmykrgbcmykrgbcmykrgb')
for k, col in zip(range(clusters),colors):
cluster_members = labels2 == k
cluster_center = X_ap[cci[k]]
plt.plot(X_ap[cluster_members, 0], X_ap[cluster_members, 1], col + '.')
plt.plot(cluster_center[0], cluster_center[1], 'o', markerfacecolor=col, markeredgecolor='k', markersize=12)
for x in X_ap[cluster_members]:
plt.plot([cluster_center[0], x[0]], [cluster_center[1], x[1]], col)
plt.show()
Model Selection
print("k-Means Clustering", metrics.silhouette_score(X, k_means.labels_, metric='euclidean'))
print("Hierarchical Clustering", metrics.silhouette_score(X, hc.fit_predict(X), metric='euclidean'))
print("Affinity Propagation Clustering", metrics.silhouette_score(X, ap.labels_, metric='euclidean'))
Silhouette Score was used as an evaluation metric for the clustering techniques. K-means, having the highest Silhouette score out of the three, was selected as the most appropriate approach.
cluster_size_limit = 1000
counts = clustered_series.value_counts()
ticker_count = counts[(counts>1) & (counts<=cluster_size_limit)]
print ("Number of clusters: %d" % len(ticker_count))
print ("Number of Pairs: %d" % (ticker_count*(ticker_count-1)).sum())
A Cointegration test is used to explore if there is a correlation between several time series in the long term. The test was introduced as an alternative to linear regression for analyzing time series to solve the issue of spurious correlation. The tests identify scenarios where two or more non-stationary time series are integrated together in a way that they cannot deviate in the long term. The tests are used to identify the degree of sensitivity of two variables to the same average price over a specified period of time.
We apply Cointegrated Augmented Dickey Fuller test for testing the stationarity of stocks followed by Cointegration testing on stocks within a cluster to find cointegrated pairs which could be used for training our spread prediction models.
#defining function for cointegrated augmented dickey fuller test
def find_cadf_pairs(data):
n = data.shape[1]
score_matrix = np.zeros((n, n))
pvalue_matrix = np.ones((n, n))
keys = data.keys()
pairs = {}
for i in range(n):
for j in range(i+1, n):
S1 = data[keys[i]]
S2 = data[keys[j]]
ols = sm.OLS(S1,S2).fit()
pred = ols.predict(S2)
adf = ts.adfuller(S1-pred)
score = adf[0]
pvalue = adf[1]
score_matrix[i, j] = score
pvalue_matrix[i, j] = pvalue
if pvalue < 0.05:
pairs[(keys[i], keys[j])] = adf
return score_matrix, pvalue_matrix, pairs
#extracting cointegrated pairs
adf_dict = {}
for i, clust in enumerate(ticker_count.index):
tickers = clustered_series[clustered_series == clust].index
score_matrix, pvalue_matrix, adf_pairs = find_cadf_pairs(data1[tickers])
adf_dict[clust] = {}
adf_dict[clust]['score_matrix'] = score_matrix
adf_dict[clust]['pvalue_matrix'] = pvalue_matrix
adf_dict[clust]['pairs'] = adf_pairs
adf_pairs = []
for cluster in adf_dict.keys():
adf_pairs.extend(adf_dict[cluster]['pairs'])
print ("Number of pairs:", len(adf_pairs))
print ("In those pairs, we found %d unique tickers." % len(np.unique(adf_pairs)))
print(adf_pairs)
#loading in previously saved adf pairs
with open('/Users/prakharpatidar/Google Drive/DS/InfoViz DS 5500/Data/Work_data/cadf_pairs.pkl', 'rb') as f:
cadf_dict = pickle.load(f)
adf_pairs = []
for cluster in cadf_dict.keys():
adf_pairs.extend(cadf_dict[cluster]['pairs'])
print ("Number of pairs:", len(adf_pairs))
print ("In those pairs, we found %d unique tickers." % len(np.unique(adf_pairs)))
#TSNE visualization
def plot_tsne(pairs):
#generating data for tsne
stocks = np.unique(pairs)
X_data = pd.DataFrame(index=X.index, data=X).T
in_pairs_series = clustered_series.loc[stocks]
stocks = list(np.unique(pairs))
X_pairs = X_data.T.loc[stocks]
#applying tsne
X_tsne = TSNE(learning_rate=30, perplexity=5, random_state=42, n_jobs=-1).fit_transform(X_pairs)
#visualizing tsne
plt.figure(1, facecolor='white',figsize=(18,15))
plt.clf()
plt.axis('off')
for pair in pairs:
ticker1 = pair[0]
loc1 = X_pairs.index.get_loc(pair[0])
x1, y1 = X_tsne[loc1, :]
ticker2 = pair[0]
loc2 = X_pairs.index.get_loc(pair[1])
x2, y2 = X_tsne[loc2, :]
plt.plot([x1, x2], [y1, y2], 'k-', alpha=0.3, c='b');
plt.scatter(X_tsne[:, 0], X_tsne[:, 1], s=215, alpha=0.8, c=in_pairs_series.values, cmap=cm.Paired)
plt.title('TSNE Visualization of Pairs');
# Join pairs by x and y
for x,y,name in zip(X_tsne[:,0],X_tsne[:,1],X_pairs.index):
label = name
plt.annotate(label,
(x,y),
textcoords="offset points",
xytext=(0,10),
ha='center')
plt.show()
#plotting TSNE for pairs in just one cluster
plot_tsne(list(cadf_dict[5]['pairs'].keys()))
#plotting TSNE for pairs in all clusters
plot_tsne(adf_pairs)
final_data = pd.read_csv('/Users/prakharpatidar/Google Drive/DS/InfoViz DS 5500/Data/Work_data/final_data.csv', index_col = 0)
final_data = final_data.dropna().reset_index(drop=True)
final_data.head()
#Heatmap visualization of each cluster pairs
fig, axs = plt.subplots(4,2, figsize=(12,12))
fig.subplots_adjust(hspace = .5, wspace=.1)
clust = 0
for i in range(4):
for j in range(2):
sns.heatmap(cadf_dict[clust]['pvalue_matrix'], cmap='RdYlGn_r'
, mask = (cadf_dict[clust]['pvalue_matrix'] >= 0.98), ax=axs[i][j])
axs[i][j].set_title(str('Cluster '+ str(clust)))
clust +=1
fig.suptitle("P-value heatmap per cluster")
plt.show()
#Finding the lowest p-value pair for each cluster
pairs_per_cluster_dict = defaultdict(lambda: defaultdict(list))
for i in cadf_dict.keys():
#finding best p_value matrix for the cluster
best_pvalue = sorted(cadf_dict[i]['pvalue_matrix'], key = lambda kv: sorted(kv))[0]
#finding index of the value in list
idx_pvalue = next(idx for idx,item in enumerate(cadf_dict[i]['pvalue_matrix']) if (item == best_pvalue).all())
if len(list(cadf_dict[i]['pairs'].keys())) !=0:
pairs_per_cluster_dict[i]['stock_pair'] = list(cadf_dict[i]['pairs'].keys())[idx_pvalue]
pairs_per_cluster_dict[i]['pvalue_matrix'] = best_pvalue
pairs_per_cluster_dict[i]['score_matrix'] = cadf_dict[i]['score_matrix'][0][idx_pvalue]
#displaying best pairs for all clusters
[pairs_per_cluster_dict[clust]['stock_pair'] for clust in cadf_dict.keys()]
We pick one of the above pairs for further analysis.
Taking,
Stock 1 (P1) = ABBV and
Stock 2 (P2) = MNST
The create_timeseries() function creates a timeseries for a given pair of stocks P1 and P2, utilizing the features of each stock suffixed by P1 and P2. It merges the features of each stock by Date to create a wide format single dataframe
def create_timeseries(final_data, P1_name, P2_name):
cols = final_data.columns.drop(['GICS Sector', 'GICS Sub-Industry', 'Symbol'])
# subset the original data for the selected pair
P1_data = final_data[final_data['Symbol']==P1_name][cols]
P2_data = final_data[final_data['Symbol']==P2_name][cols]
# rename columns to identify the features
P1_data.columns = ['P1_' + s for s in cols]
P2_data.columns = ['P2_' + s for s in cols]
# merge both the data on Date to create a single time series wide format data
pair_data = pd.merge(P1_data, P2_data, left_on='P1_Date', right_on='P2_Date')
pair_data.index = pair_data['P1_Date']
del pair_data['P2_Date']
del pair_data['P1_Date']
return pair_data
P1 = 'ABBV'
P2 = 'MNST'
pair_data = create_timeseries(final_data, P1, P2)
pair_data.head()
pair_data['P1_Close'].plot(figsize=(15,7))
pair_data['P2_Close'].plot(figsize=(15,7))
plt.ylabel('Spread')
plt.xlabel('Date')
plt.title('Spread_Close')
plt.show()
The strategy uses the spread divergence to open/close a position. We open a position with the spread when the spread diverges from 0, hoping that it will converge again to its mean which is 0.
The ground_spread() function is used run linear regression of $ log(p_1) = \beta X log(p_2) + \varepsilon$, where $p_1$ and $p_2$ are daily closing prices of the stocks P1 and P2, the coefficient $\beta$ is the cointegration coeff and the stochastic term $\varepsilon$ is the spread.
def ground_spread(pair1, pair2):
est = est = sm.OLS(pair1, pair2)
est = est.fit()
beta = -est.params[0]
return pair1 + (pair2 * beta)
def acc_metric(true_value, predicted_value):
acc_met = 0.0
m = len(true_value)
for i in range(m):
acc_met += mean_squared_error(true_value, predicted_value)
acc_met /= m
return np.sqrt(acc_met)
#Calculating spread for close prices of both stocks
pair_data['Spread_Close'] = ground_spread(pair_data.P1_Close, pair_data.P2_Close)
pair_data['Spread_Close'].plot(figsize=(15,7))
plt.axhline(pair_data['Spread_Close'].mean())
plt.title('Spread for Ground Truth')
plt.ylabel('Spread')
plt.xlabel('Date')
plt.legend(['Spread_Close'])
plt.show()
#Calculating spread for open prices of both stocks
pair_data['Spread_Open'] = ground_spread(pair_data.P1_Open, pair_data.P2_Open)
#Calculating spread for high prices of both stocks
pair_data['Spread_High'] = ground_spread(pair_data.P1_High, pair_data.P2_High)
#Calculating spread for low prices of both stocks
pair_data['Spread_Low'] = ground_spread(pair_data.P1_Low, pair_data.P2_Low)
#Creating train test split
train_size = int(len(pair_data) * 0.9)
val_size = int((len(pair_data) - train_size) * 0.5) - 30
test_size = len(pair_data) - train_size - val_size
#splitting the data
train, val, test = pair_data[0:train_size], pair_data[train_size:train_size + val_size], pair_data[train_size + val_size:len(pair_data)]
print(len(train), len(val), len(test))
# Function for creating dataset with look back
def create_dataset(dataset, look_back=1):
dataX, dataY = [], []
for i in range(len(dataset) - look_back):
a = dataset[i, :]
dataX.append(a)
dataY.append(dataset[(i+1):(i+1+look_back), 0])
print(len(dataY))
return dataX, scaler.fit_transform(dataX), dataY, scaler.fit_transform(dataY)
# Creating data for look back period of S_t -1 and performing train_test split
look_back = 1
scaler = MinMaxScaler(feature_range=(0, 1))
# Generate dataset for trainX, trainY, testX, testY
X_train_unorm, X_train, Y_train_unorm, Y_train = create_dataset(train.values, look_back)
X_val_unorm, X_val, Y_val_unorm, Y_val = create_dataset(val.values, look_back)
X_test_unorm, X_test, Y_test_unorm, Y_test = create_dataset(test.values, look_back)
We begin with testing ARIMA as our first model and making predictions 1-time step ahead.
ARIMA models are the most general class of models for forecasting on time series.
#Function to fit ARIMA model
def fit_ARIMA(data):
yhat_ARIMA = []
yhat_ARIMA_mse = []
for i in range(train_size+val_size, len(data)-1):
model = ARIMA(data[:i], order=(1,0,0))
model_fit = model.fit(disp=0)
temp = []
forecast = (model_fit.forecast(steps=look_back)[0])
yhat_ARIMA.append(forecast[0])
for j in range(len(forecast)):
temp.append(forecast[j])
yhat_ARIMA_mse.extend(temp)
return yhat_ARIMA, yhat_ARIMA_mse
#function to normalize a series
def normalize(series):
return (series - np.mean(series)) / np.std(series)
#Arima Model on Spread close values
yhat_ARIMA, yhat_ARIMA_mse = fit_ARIMA(pair_data['Spread_Close'].values)
#calculating accuracy metric MSE
mse_ARIMA = acc_metric(Y_test_unorm, yhat_ARIMA_mse)
# print('ARIMA MSE: ', mse_ARIMA)
#plotting normalized Y_test and Prediction from ARIMA
plt.figure(figsize = (15,8))
plt.plot(pd.Series(normalize([i[0] for i in Y_test_unorm])), label='True Value')
plt.plot(pd.Series(normalize(yhat_ARIMA_mse)), label='Prediction Arima')
plt.title('Spread true vs predictied -- ARIMA')
plt.xlabel('timestep')
plt.ylabel('normalized spread')
plt.legend()
plt.show()
Moving on, we implemented Kalman Filters which also are very commonly used for predicting time series. The use of Kalman filters has been very well studied in literature in the field of prediction spread amongst cointegrated stocks.
def KalmanFilterAverage(x):
# Construct a Kalman filter
kf = KalmanFilter(transition_matrices = [1],
observation_matrices = [1],
initial_state_mean = 0,
initial_state_covariance = 1,
observation_covariance=1,
transition_covariance=.01)
# Use the observed values of the price to get a rolling mean
state_means, _ = kf.filter(x.values)
state_means = pd.Series(state_means.flatten(), index=x.index)
return state_means
# Kalman filter regression
def KalmanFilterRegression(x,y):
delta = 1e-3
trans_cov = delta / (1 - delta) * np.eye(2) # How much random walk wiggles
obs_mat = np.expand_dims(np.vstack([[x], [np.ones(len(x))]]).T, axis=1)
kf = KalmanFilter(n_dim_obs=1, n_dim_state=2, # y is 1-dimensional, (alpha, beta) is 2-dimensional
initial_state_mean=[0,0],
initial_state_covariance=np.ones((2, 2)),
transition_matrices=np.eye(2),
observation_matrices=obs_mat,
observation_covariance=2,
transition_covariance=trans_cov)
# Use the observations y to get running estimates and errors for the state parameters
state_means, state_covs = kf.filter(y.values)
return state_means
#function to forecast values
def forecast_kalman_lookback(Y_test_unorm, forecast, look_back=1):
yhat_KF = forecast
yhat_KF_mse = []
mse = 0.0
if look_back == 1:
for i in range(len(forecast)):
temp = []
temp.append(forecast[i])
yhat_KF_mse.append(np.array(temp))
mse = acc_metric(normalize(Y_test_unorm), yhat_KF_mse)
else:
mse = 0.0
return yhat_KF, yhat_KF_mse, mse
#fit Kalman filter regressor
state_means = - KalmanFilterRegression(KalmanFilterAverage(pair_data.P1_Close),KalmanFilterAverage(pair_data.P2_Close))[:,0]
#normalizing the results
results = normalize(pair_data.P1_Close + (pair_data.P2_Close * state_means))
#forecasting values using result
forecast = results[-len(X_test):].values
#finding MSE for prediction on lookback 1
yhat_Kalman, yhat_Kalman_mse, mse_kalman = forecast_kalman_lookback(Y_test_unorm, forecast, look_back=1)
# print('Kalman Filters MSE: ', mse_kalman)
#plotting normalized Y_test and Prediction from ARIMA
plt.figure(figsize = (15,8))
plt.plot(normalize(pair_data['Spread_Close']), label='True Value')
plt.plot(results, label='Prediction Arima')
plt.title('Spread true vs predictied -- Kalman Filters')
plt.xlabel('timestep')
plt.ylabel('normalized spread')
plt.axhline(normalize(pair_data['Spread_Close']).mean(), color='black')
plt.axhline(1.0, color='red', linestyle='--')
plt.axhline(-1.0, color='green', linestyle='--')
plt.legend(['Spread z-score', 'Kalman_Predicted_Spread', 'Mean', '+1', '-1'])
plt.show()
After testing the data on the standard models, we move on to build our state-of-the-art Long short term memory neural network architecture.
LSTM models are capable of learning long term dependencies. The architecture consisted of 2 stacked LSTM neural layers followed by a fully connected layers and is used to predict the value of spread 1-time step in future.
#Creating the LSTM model
class lstm(Model):
def __init__(self, in_shape, look_back):
super(lstm, self).__init__()
self.shape = in_shape
self.look_back = look_back
self.lstm_l1 = LSTM(256, input_shape=self.shape, return_sequences = True)
self.lstm_l2 = LSTM(256)
self.dropout = Dropout(0.2)
self.dense = Dense(self.look_back)
def call(self, x):
x = self.lstm_l1(x)
x = self.lstm_l2(x)
x = self.dropout(x)
x = self.dense(x)
return x
# Reshape X for model training
trainX = np.reshape(X_train, (X_train.shape[0], 1, X_train.shape[1]))
valX = np.reshape(X_val, (X_val.shape[0], 1, X_val.shape[1]))
testX = np.reshape(X_test, (X_test.shape[0], 1, X_test.shape[1]))
#initializing the lstm model
lstm_model = lstm((trainX.shape[1], trainX.shape[2]), look_back = 1)
lstm_model.compile(loss='mse',
optimizer='adam',
metrics=['accuracy'])
#fitting the model
history = lstm_model.fit(trainX,
Y_train,
epochs=400,
batch_size=100,
validation_data=(valX, Y_val),
verbose=0,
shuffle=True)
lstm_model.summary()
# Plot line graph to show amount loss according the the epoch
plt.figure(figsize=(8,6))
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='val')
plt.title('Loss Plot -- LSMT')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()
# Predictions on X_test
yhat = lstm_model.predict(testX)
plt.figure(figsize=(15,8))
plt.plot(Y_test, label='True Value')
plt.plot(yhat, label='Predicted Value -- LSTM')
plt.title('Spread true vs predictied -- LSTM')
plt.xlabel('timestep')
plt.ylabel('normalized spread')
plt.legend()
plt.show()
# Scaler Inverse Y back to normal value
yhat_LSTM = scaler.inverse_transform(yhat)
Y_test_LSTM = scaler.inverse_transform(Y_test)
# Training and Test error
mse_LSTM_train = acc_metric(scaler.inverse_transform(Y_train), scaler.inverse_transform(lstm_model.predict(trainX)))
mse_LSTM_test = acc_metric(yhat_LSTM, Y_test_LSTM)
# print('LSTM MSE: ')
# print('Train-- ', mse_LSTM_train)
# print('Test-- ', mse_LSTM_test)
predictDates = pair_data.tail(len(testX)).index
testY_reshape = normalize(Y_test_LSTM).reshape(len(Y_test_LSTM))
yhat_reshape = normalize(yhat_LSTM).reshape(len(yhat_LSTM))
kalman_reshape = normalize(yhat_Kalman).reshape(len(yhat_Kalman))
#Plot predicted and actual line graph
layout = go.Layout(
yaxis=dict(
title='Spread',
titlefont=dict(
family='Arial, sans-serif',
size=18
)))
actual_chart = go.Scatter(x=predictDates, y=testY_reshape, name= 'Actual Spread')
predict_chart = go.Scatter(x=predictDates, y=yhat_reshape, name= 'LSTM Predict Spread')
predict_kalman_chart = go.Scatter(x=predictDates, y=kalman_reshape, name= 'Kalman Filter Predict Spread')
fig = go.Figure(data = [predict_kalman_chart, predict_chart, actual_chart], layout = layout)
fig.update_layout(
title='Actual vs Predicted Spread')
py.iplot(fig)
The spread information is used to develop a simple trading strategy to determine when to buy or sell the stocks.
We look at the value of the normalized spread and when the spread significantly deviated from the mean of the spread,
we take a short position in the overvalues stock and a long position in the undervalued stock.
# Test data frame for testing trading strategy
test_data = pd.DataFrame({'P1':pair_data['P1_Close'].iloc[-len(testX):],'P2':pair_data['P2_Close'].iloc[-len(testX):]})
test_data['Actual_Spread'] = pair_data['Spread_Close'].iloc[-len(testX):]
test_data['Kalman_Predicted_Spread'] = yhat_Kalman
test_data['ARIMA_Predicted_Spread'] = yhat_ARIMA
test_data['LSTM_Predicted_Spread'] = list(yhat_LSTM[:,0])
data = test_data['Actual_Spread']
moving_avg_5day= data.rolling(window=5,center=False).mean()
moving_avg_60day = data.rolling(window=60,center=False).mean()
std_60day = data.rolling(window=60,
center=False).std()
zscore_60_5 = (moving_avg_5day - moving_avg_60day)/std_60day
plt.figure(figsize=(15,7))
plt.plot(data.index, data.values)
plt.plot(moving_avg_5day.index, moving_avg_5day.values)
plt.plot(moving_avg_60day.index, moving_avg_60day.values)
plt.legend(['Spread','5d Ratio Moving Avg', '60d Ratio Moving Avg'])
plt.ylabel('Spread')
plt.show()
plt.figure(figsize=(15,7))
zscore_60_5.plot()
plt.axhline(0, color='black')
plt.axhline(1.0, color='red', linestyle='--')
plt.axhline(-1.0, color='green', linestyle='--')
plt.legend(['Rolling Spread z-Score', 'Mean', '+1', '-1'])
plt.show()
# Plot the ratios and buy and sell signals from z score
plt.figure(figsize=(15,7))
data[60:].plot()
#separating buy sell signals based on z_scores
buy = data.copy()
sell = data.copy()
buy[zscore_60_5>-1] = -100
sell[zscore_60_5<1] = -100
buy[60:].plot(color='g', linestyle='None', marker='^')
sell[60:].plot(color='r', linestyle='None', marker='^')
x1,x2,y1,y2 = plt.axis()
plt.axis((x1,x2,data.min(),data.max()))
plt.legend(['Spread', 'Buy Signal', 'Sell Signal'])
plt.show()
# Plot the prices and buy and sell signals from z score
plt.figure(figsize=(18,9))
P1 = test_data.P1
P2 = test_data.P2
P1[60:].plot(color='b')
P2[60:].plot(color='c')
buyR = 0*P1.copy()
sellR = 0*P1.copy()
# When buying the ratio, buy S1 and sell S2
buyR[buy!=-100] = P1[buy!=-100]
sellR[buy!=-100] = P2[buy!=-100]
# When selling the ratio, sell S1 and buy S2
buyR[sell!=-100] = P2[sell!=-100]
sellR[sell!=-100] = P1[sell!=-100]
buyR[60:].plot(color='g', linestyle='None', marker='^')
sellR[60:].plot(color='r', linestyle='None', marker='^')
x1,x2,y1,y2 = plt.axis()
plt.axis((x1,x2,min(P1.min(),P2.min()),max(P1.max(),P2.max())))
plt.legend(['P1', 'P2', 'Buy Signal', 'Sell Signal'])
plt.show()
P1 = test_data.P1
P2 = test_data.P2
buyR = 0*P1.copy()
sellR = 0*P1.copy()
# When buying the ratio, buy S1 and sell S2
buyR[buy!=-100] = P1[buy!=-100]
sellR[buy!=-100] = P2[buy!=-100]
# When selling the ratio, sell S1 and buy S2
buyR[sell!=-100] = P2[sell!=-100]
sellR[sell!=-100] = P1[sell!=-100]
fig = px.line(x=test_data[60:].index, y=[P1[60:].values, P2[60:].values],
title='Spread true vs predictied -- LSTM',
labels={'index':'Date', 'value':'Closing Price'})
fig.add_scatter(x=buyR[60:].index, y=buyR[60:].values, mode='markers', marker_color='green',
marker_symbol='triangle-up',marker_size=10)
fig.add_scatter(x=sellR[60:].index, y=sellR[60:].values, mode='markers', marker_color='red',
marker_symbol='triangle-up',marker_size=10)
# names = cycle(['P1', 'P2', 'Buy Signal', 'Sell Signal'])
fig.update_traces(line=dict(width=2.5))
# fig.update_layout(legend_title="")
fig.update_yaxes(range=[min(P1.min(),P2.min()),max(P1.max(),P2.max())])
fig.show()
Estimating profit using the developed trading strategy
def trade(Stock1, Stock2, spread, window1, window2):
# If window length is 0, algorithm doesn't make sense, so exit
if (window1 == 0) or (window2 == 0):
return 0
# Compute rolling mean and rolling standard deviation
moving_avg1 = spread.rolling(window=window1, center=False).mean()
moving_avg2 = spread.rolling(window=window2, center=False).mean()
std = spread.rolling(window=window2, center=False).std()
zscore = (moving_avg1 - moving_avg2)/std
# Simulate trading
# Start with no money and no positions
money = 0
trade_count_S1 = 0
trade_count_S2 = 0
for i in range(len(spread)):
# Sell short if the z-score is > 1
if zscore[i] > 1:
money += Stock1[i] - Stock2[i] * spread[i]
trade_count_S1 -= 1
trade_count_S2 += spread[i]
# Buy long if the z-score is < 1
elif zscore[i] < -1:
money -= Stock1[i] - Stock2[i] * spread[i]
trade_count_S1 += 1
trade_count_S2 -= spread[i]
# Clear positions if the z-score between -.5 and .5
elif abs(zscore[i]) < 0.5:
money += (trade_count_S1 * Stock1[i]) - (Stock2[i] * trade_count_S2)
trade_count_S1 = 0
trade_count_S2 = 0
return money
data = test_data['Actual_Spread']
profit = trade(test_data['P1'], test_data['P2'], data, 30, 5)
profit